{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import sys\n",
    "# 如果当前代码文件运行测试需要加入修改路径，避免出现后导包问题\n",
    "BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))\n",
    "sys.path.insert(0, os.path.join(BASE_DIR))\n",
    "\n",
    "PYSPARK_PYTHON = \"/miniconda2/envs/reco_sys/bin/python\"\n",
    "# 当存在多个版本时，不指定很可能会导致出错\n",
    "os.environ[\"PYSPARK_PYTHON\"] = PYSPARK_PYTHON\n",
    "os.environ[\"PYSPARK_DRIVER_PYTHON\"] = PYSPARK_PYTHON\n",
    "\n",
    "from offline import SparkSessionBase\n",
    "from setting.default import channelInfo\n",
    "from pyspark.ml.feature import Word2Vec\n",
    "\n",
    "\n",
    "\n",
    "class TrainWord2VecModel(SparkSessionBase):\n",
    "\n",
    "    SPARK_APP_NAME = \"Word2Vec\"\n",
    "    SPARK_URL = \"yarn\"\n",
    "    \n",
    "    ENABLE_HIVE_SUPPORT = True\n",
    "\n",
    "    def __init__(self):\n",
    "        self.spark = self._create_spark_session()\n",
    "\n",
    "\n",
    "w2v = TrainWord2VecModel()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 训练一个频道的模型\n",
    "w2v.spark.sql(\"use article\")\n",
    "\n",
    "article_data = w2v.spark.sql(\"select * from article_data where article_id=18 limit 5\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 文章数据进行分词处理,得到分词结果\n",
    "# 分词\n",
    "def segmentation(partition):\n",
    "    import os\n",
    "    import re\n",
    "\n",
    "    import jieba\n",
    "    import jieba.analyse\n",
    "    import jieba.posseg as pseg\n",
    "    import codecs\n",
    "\n",
    "    abspath = \"/root/words\"\n",
    "\n",
    "    # 结巴加载用户词典\n",
    "    userDict_path = os.path.join(abspath, \"ITKeywords.txt\")\n",
    "    jieba.load_userdict(userDict_path)\n",
    "\n",
    "    # 停用词文本\n",
    "    stopwords_path = os.path.join(abspath, \"stopwords.txt\")\n",
    "\n",
    "    def get_stopwords_list():\n",
    "        \"\"\"返回stopwords列表\"\"\"\n",
    "        stopwords_list = [i.strip()\n",
    "                          for i in codecs.open(stopwords_path).readlines()]\n",
    "        return stopwords_list\n",
    "\n",
    "    # 所有的停用词列表\n",
    "    stopwords_list = get_stopwords_list()\n",
    "\n",
    "    # 分词\n",
    "    def cut_sentence(sentence):\n",
    "        \"\"\"对切割之后的词语进行过滤，去除停用词，保留名词，英文和自定义词库中的词，长度大于2的词\"\"\"\n",
    "        # print(sentence,\"*\"*100)\n",
    "        # eg:[pair('今天', 't'), pair('有', 'd'), pair('雾', 'n'), pair('霾', 'g')]\n",
    "        seg_list = pseg.lcut(sentence)\n",
    "        seg_list = [i for i in seg_list if i.flag not in stopwords_list]\n",
    "        filtered_words_list = []\n",
    "        for seg in seg_list:\n",
    "            # print(seg)\n",
    "            if len(seg.word) <= 1:\n",
    "                continue\n",
    "            elif seg.flag == \"eng\":\n",
    "                if len(seg.word) <= 2:\n",
    "                    continue\n",
    "                else:\n",
    "                    filtered_words_list.append(seg.word)\n",
    "            elif seg.flag.startswith(\"n\"):\n",
    "                filtered_words_list.append(seg.word)\n",
    "            elif seg.flag in [\"x\", \"eng\"]:  # 是自定一个词语或者是英文单词\n",
    "                filtered_words_list.append(seg.word)\n",
    "        return filtered_words_list\n",
    "\n",
    "    for row in partition:\n",
    "        sentence = re.sub(\"<.*?>\", \"\", row.sentence)    # 替换掉标签数据\n",
    "        words = cut_sentence(sentence)\n",
    "        yield row.article_id, row.channel_id, words"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "words_df = article_data.rdd.mapPartitions(segmentation).toDF(['article_id', 'channel_id', 'words'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+----------+--------------------+\n",
      "|article_id|channel_id|               words|\n",
      "+----------+----------+--------------------+\n",
      "|        18|        17|[web, pa, react, ...|\n",
      "+----------+----------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "words_df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 直接diaoyong调用word2vec训练\n",
    "w2v_model = Word2Vec(vectorSize=100, inputCol='words', outputCol='model', minCount=3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "model = w2v_model.fit(words_df)\n",
    "model.save(\"hdfs://hadoop-master:9000/headlines/models/test.word2vec\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 1、加载某个频道模型，得到每个词的向量\n",
    "from pyspark.ml.feature import Word2VecModel\n",
    "wv = Word2VecModel.load(\"hdfs://hadoop-master:9000/headlines/models/word2vec_model/channel_18_python.word2vec\")\n",
    "vectors = wv.getVectors()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+--------------------+\n",
      "|              word|              vector|\n",
      "+------------------+--------------------+\n",
      "|                广义|[0.28907623887062...|\n",
      "|                钟爱|[-0.0529650673270...|\n",
      "|c1c3387c24028915fc|[0.08250344544649...|\n",
      "|          failCnt0|[-0.0034321683924...|\n",
      "|       freeman1974|[0.01132440567016...|\n",
      "|                伙伴|[-0.1075697541236...|\n",
      "|  testStationarity|[0.09605087339878...|\n",
      "|                箭头|[0.08957882970571...|\n",
      "|        fieldsfrom|[-0.0121747571974...|\n",
      "|      RoundrobinLB|[-0.0941602289676...|\n",
      "|              COCO|[-0.2620599269866...|\n",
      "|                拜拜|[-0.0820834264159...|\n",
      "|          quotient|[0.08328679203987...|\n",
      "|                货币|[-0.0276580695062...|\n",
      "|                人物|[-0.1581292450428...|\n",
      "|               wsy|[0.09347543865442...|\n",
      "|           serious|[0.01220745686441...|\n",
      "|               跨进程|[-0.0330988727509...|\n",
      "|        fromParams|[0.00353708816692...|\n",
      "|        MongoDB数据库|[-0.1521783322095...|\n",
      "+------------------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "vectors.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+----------+--------+-------------------+\n",
      "|article_id|channel_id| keyword|             weight|\n",
      "+----------+----------+--------+-------------------+\n",
      "|     13098|        18|    repr| 0.6326590117716192|\n",
      "|     13098|        18|      __| 2.5401122038114203|\n",
      "|     13098|        18|      属性|0.23645924932468856|\n",
      "|     13098|        18|     pre| 0.6040062287555379|\n",
      "|     13098|        18|    code| 0.9531379029975557|\n",
      "|     13098|        18|     def| 0.5063435861497416|\n",
      "|     13098|        18|   color| 1.1337936117177925|\n",
      "|     13098|        18|      定义| 0.1554380122061322|\n",
      "|     13098|        18| Student| 0.5033771372284416|\n",
      "|     13098|        18|getPrice| 0.7404427038950527|\n",
      "|     13098|        18|      方法|0.08080845613717194|\n",
      "|     13098|        18|     div| 0.3434819820586186|\n",
      "|     13098|        18|     str|0.35999033790156054|\n",
      "|     13098|        18|      pa| 0.6651385256756351|\n",
      "|     13098|        18|   slots| 0.6992789472129189|\n",
      "|     13098|        18| cnblogs|0.33926586102013295|\n",
      "|     13098|        18|      函数|0.15015578405898256|\n",
      "|     13098|        18|   style| 2.4777013955852873|\n",
      "|     13098|        18|      &#| 0.4911011561534254|\n",
      "|     13098|        18|   class|0.28891320463243075|\n",
      "+----------+----------+--------+-------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "\n",
    "#2、获取频道的文章画像，得到文章画像的关键词(接着之前增量更新的文章article_profile)\n",
    "# 获取这些文章20个关键词名称，对应名称找到词向量\n",
    "article_profile = w2v.spark.sql(\"select * from article_profile where channel_id=18 limit 10\")\n",
    "\n",
    "#3、计算得到文章每个词的向量\n",
    "article_profile.registerTempTable('profile')\n",
    "keyword_weight = w2v.spark.sql(\"select article_id, channel_id, keyword, weight from profile LATERAL VIEW explode(keywords) AS keyword, weight\")\n",
    "keyword_weight.show()\n",
    "\n",
    "\n",
    "#4、计算得到文章的平均词向量即文章的向量"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 合并文章关键词与词向量\n",
    "_keywords_vector = keyword_weight.join(vectors, vectors.word==keyword_weight.keyword, 'inner')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+----------+-------+-------------------+------+--------------------+\n",
      "|article_id|channel_id|keyword|             weight|  word|              vector|\n",
      "+----------+----------+-------+-------------------+------+--------------------+\n",
      "|     12936|        18| strong|  4.705249850191452|strong|[0.06607607007026...|\n",
      "|     12936|        18| python| 1.5221131438694515|python|[-0.0696719884872...|\n",
      "|     12936|        18|   bool|  1.698618363235006|  bool|[0.08196849375963...|\n",
      "|     12936|        18|   编程语言| 1.0816836469752635|  编程语言|[-0.1638689935207...|\n",
      "|     12936|        18|   True| 1.1097628840375606|  True|[0.24163006246089...|\n",
      "|     12936|        18|     遗漏|   1.63385196709268|    遗漏|[-0.2000092417001...|\n",
      "|     12936|        18|   elif| 2.0815467371010805|  elif|[0.23405943810939...|\n",
      "|     12936|        18|    int|   0.69289212656367|   int|[-0.0279460791498...|\n",
      "|     12936|        18|    str| 1.1603775123519366|   str|[-0.0372486524283...|\n",
      "|     12936|        18|     pa|0.36743860272647505|    pa|[0.12457559257745...|\n",
      "|     12936|        18|     单词|  1.406335008823204|    单词|[-0.0355367287993...|\n",
      "|     12936|        18|     数据| 0.2990961325092939|    数据|[0.14358098804950...|\n",
      "|     12936|        18|     过程| 0.4050493263937482|    过程|[-0.0100963125005...|\n",
      "|     12936|        18|   if语句|  1.957626628186806|  if语句|[0.21955621242523...|\n",
      "|     12936|        18|     大写| 1.7154369266840954|    大写|[0.03721550852060...|\n",
      "|     12936|        18|  style| 1.6059537337711527| style|[0.25782978534698...|\n",
      "|     12936|        18|   命名规范| 1.9782233064408097|  命名规范|[0.06696650385856...|\n",
      "|     12936|        18|     字母| 1.2148323436339508|    字母|[0.24973358213901...|\n",
      "|     12936|        18|     重点| 0.9063778617582984|    重点|[0.06429161876440...|\n",
      "|     12936|        18|   font| 1.7792174100662055|  font|[0.66760659217834...|\n",
      "+----------+----------+-------+-------------------+------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "_keywords_vector.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [],
   "source": [
    "def compute_vector(row):\n",
    "    return row.article_id, row.channel_id, row.keyword, row.weight * row.vector\n",
    "articleKeywordVectors = _keywords_vector.rdd.map(compute_vector).toDF([\"article_id\", \"channel_id\", \"keyword\", \"weightingVector\"])\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+----------+--------+--------------------+\n",
      "|article_id|channel_id| keyword|     weightingVector|\n",
      "+----------+----------+--------+--------------------+\n",
      "|     13098|        18|    repr|[0.13308673769053...|\n",
      "|     13098|        18|      __|[0.03018926765933...|\n",
      "|     13098|        18|      属性|[-0.0191257052454...|\n",
      "|     13098|        18|     pre|[0.34502730264365...|\n",
      "|     13098|        18|    code|[0.34601303844503...|\n",
      "|     13098|        18|     def|[0.07761504849378...|\n",
      "|     13098|        18|   color|[0.61312345712161...|\n",
      "|     13098|        18|      定义|[-0.0010762159703...|\n",
      "|     13098|        18| Student|[0.09441257176805...|\n",
      "|     13098|        18|getPrice|[-0.0847735848446...|\n",
      "|     13098|        18|      方法|[-0.0048283701284...|\n",
      "|     13098|        18|     div|[0.04037546778136...|\n",
      "|     13098|        18|     str|[-0.0134091549740...|\n",
      "|     13098|        18|      pa|[0.08286002598213...|\n",
      "|     13098|        18|   slots|[-0.2270685226558...|\n",
      "|     13098|        18| cnblogs|[0.02879135118511...|\n",
      "|     13098|        18|      函数|[0.01213366982724...|\n",
      "|     13098|        18|   style|[0.63882521897767...|\n",
      "|     13098|        18|      &#|[-0.0073760822316...|\n",
      "|     13098|        18|   class|[-0.0168053401172...|\n",
      "+----------+----------+--------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "articleKeywordVectors.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 4、计算得到文章的平均词向量即文章的向量\n",
    "articleKeywordVectors.registerTempTable('temptable')\n",
    "articleKeywordVectors = w2v.spark.sql(\"select article_id, min(channel_id) channel_id, collect_set(weightingVector) vectors from temptable group by article_id\")\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+----------+--------------------+\n",
      "|article_id|channel_id|             vectors|\n",
      "+----------+----------+--------------------+\n",
      "|     13098|        18|[[0.6131234571216...|\n",
      "|     13248|        18|[[2.5336782538801...|\n",
      "|     13401|        18|[[0.1199601801400...|\n",
      "|     13723|        18|[[0.0767033252676...|\n",
      "|     14719|        18|[[-0.091019116457...|\n",
      "|     14846|        18|[[-0.069822823712...|\n",
      "|     15173|        18|[[-0.359728582063...|\n",
      "|     15194|        18|[[-0.006211698526...|\n",
      "|     15237|        18|[[-0.023426829636...|\n",
      "|     15322|        18|[[0.0780172035455...|\n",
      "+----------+----------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "articleKeywordVectors.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 求平均值\n",
    "def compute_avg_vectors(row):\n",
    "    x = 0\n",
    "    for i in row.vectors:\n",
    "        x += i\n",
    "    \n",
    "    # 求平均值\n",
    "    return row.article_id, row.channel_id, x / len(row.vectors)\n",
    "\n",
    "article_vector = articleKeywordVectors.rdd.map(compute_avg_vectors).toDF(['article_id', 'channel_id', 'vector'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+----------+--------------------+\n",
      "|article_id|channel_id|              vector|\n",
      "+----------+----------+--------------------+\n",
      "|     12936|        18|[0.15785672885486...|\n",
      "|     13206|        18|[0.36658417091938...|\n",
      "|     14029|        18|[0.08382564595017...|\n",
      "|     14259|        18|[-0.1518681660977...|\n",
      "|     14805|        18|[0.11028526511434...|\n",
      "|     15921|        18|[0.10679691438887...|\n",
      "|     17370|        18|[0.08871408187970...|\n",
      "|     17595|        18|[0.21126698350251...|\n",
      "|     18026|        18|[0.32436757418235...|\n",
      "|     18117|        18|[-0.1335141347559...|\n",
      "+----------+----------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "article_vector.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[article_id: bigint, channel_id: bigint, vector: vector]"
      ]
     },
     "execution_count": 38,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "article_vector"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [],
   "source": [
    "def toArray(row):\n",
    "    return row.article_id, row.channel_id, [float(i) for i in row.vector.toArray()]\n",
    "\n",
    "article_vector = article_vector.rdd.map(toArray).toDF(['article_id', 'channel_id', 'vector'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[article_id: bigint, channel_id: bigint, vector: array<double>]"
      ]
     },
     "execution_count": 40,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "article_vector"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 1、拿到Python频道的所有文章数据，10片测试\n",
    "from pyspark.ml.linalg import Vectors\n",
    "\n",
    "def toVector(row):\n",
    "    return row.article_id, Vectors.dense(row.vector)\n",
    "train = article_vector.rdd.map(toVector).toDF(['article_id', 'vector'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[article_id: bigint, vector: vector]"
      ]
     },
     "execution_count": 43,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "train"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 计算相似的文章\n",
    "from pyspark.ml.feature import BucketedRandomProjectionLSH\n",
    "brp = BucketedRandomProjectionLSH(inputCol='vector', outputCol='hashes', numHashTables=4.0, bucketLength=10.0)\n",
    "model = brp.fit(train)\n",
    "similar = model.approxSimilarityJoin(train, train, 2.0, distCol='EuclideanDistance')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+------------------+\n",
      "|            datasetA|            datasetB| EuclideanDistance|\n",
      "+--------------------+--------------------+------------------+\n",
      "|[17595,[0.2112669...|[17595,[0.2112669...|               0.0|\n",
      "|[12936,[0.1578567...|[17370,[0.0887140...| 1.361573657055773|\n",
      "|[15921,[0.1067969...|[14805,[0.1102852...|0.7717752375745968|\n",
      "|[15921,[0.1067969...|[12936,[0.1578567...|1.5205099193938123|\n",
      "|[15921,[0.1067969...|[14259,[-0.151868...|  1.86883894754822|\n",
      "|[13206,[0.3665841...|[13206,[0.3665841...|               0.0|\n",
      "|[14029,[0.0838256...|[17595,[0.2112669...|1.9271039464531845|\n",
      "|[12936,[0.1578567...|[12936,[0.1578567...|               0.0|\n",
      "|[12936,[0.1578567...|[18117,[-0.133514...|1.8511391009238651|\n",
      "|[18117,[-0.133514...|[18117,[-0.133514...|               0.0|\n",
      "|[18117,[-0.133514...|[14805,[0.1102852...|1.4227262480154705|\n",
      "|[17370,[0.0887140...|[15921,[0.1067969...|0.8802763327501671|\n",
      "|[14029,[0.0838256...|[14029,[0.0838256...|               0.0|\n",
      "|[17595,[0.2112669...|[14029,[0.0838256...|1.9271039464531845|\n",
      "|[14805,[0.1102852...|[12936,[0.1578567...|1.3378357895763942|\n",
      "|[14805,[0.1102852...|[14029,[0.0838256...|1.8215851677420467|\n",
      "|[17370,[0.0887140...|[14029,[0.0838256...|1.9964285103159445|\n",
      "|[14259,[-0.151868...|[17370,[0.0887140...|1.8488428360491858|\n",
      "|[14805,[0.1102852...|[18117,[-0.133514...|1.4227262480154705|\n",
      "|[18117,[-0.133514...|[17370,[0.0887140...|1.5285416639087503|\n",
      "+--------------------+--------------------+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "similar.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[datasetA: struct<article_id:bigint,vector:vector,hashes:array<vector>>, datasetB: struct<article_id:bigint,vector:vector,hashes:array<vector>>, EuclideanDistance: double]"
      ]
     },
     "execution_count": 46,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "similar"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def save_hbase(partitions):\n",
    "    import happybase\n",
    "    pool = happybase.ConnectionPool(size=3, host='hadoop-master')\n",
    "    \n",
    "    with pool.connection() as conn:\n",
    "        article_similar = conn.table('article_similar')\n",
    "        for row in partitions:\n",
    "            if row.datasetA.article_id == row.datasetB.article_id:\n",
    "                pass\n",
    "            else:\n",
    "                article_similar.put(str(row.datasetA.article_id).encode(),\n",
    "                                   {'similar:{}'.format(row.datasetB.article_id).encode(): b'%0.4f' % (row.EuclideanDistance)})\n",
    "\n",
    "similar.foreachPartition(save_hbase)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
